极客实验室是极客国际公园旗下为未来而构建的极客社区;
我们正在构建一个活跃的小众社区,汇聚众多优秀开发者与设计师;
关注极具创新精神的前沿技术&分享交流&项目合作机会等互联网行业服务;
Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见!
Future Vision : Establishment of the Geek Foundation;
GeekParkHub GithubHome:https://github.com/geekparkhub
GeekParkHub GiteeHome:https://gitee.com/geekparkhub
欢迎贡献各领域开源野生Blog&笔记&文章&片段&分享&创想&OpenSource Project&Code&Code Review
🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈 issues: geekparkhub.github.io/issues 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈
Official Public Email
Group Email:geekparkhub@outlook.com —— hackerparkhub@outlook.com —— hackerpark@hotmail.com
User Email:jeep711.home.@gmail.com —— jeep-711@outlook.com
System Email:systemhub-711@outlook.com
Service Email:servicehub-711@outlook.com
Spark是一种基于内存快速 / 通用 / 可扩展大数据分析引擎.
Spark在2009年诞生于(UC Berkeley AMP Lab)加州大学伯克利分校AMP实验室,Spark是使用内存计算的开源大数据并行计算框架,可以应对复杂的大数据处理场景,2013年Spark成为Apache基金会旗下顶级项目.
Spark内核是由Scala编程语言开发,同时也提供了Java/Python/R语言等开发编程接口.
1.Spark Core : 实现了Spark基本功能,包含任务调度 / 内存管理 / 错误恢复 / 与存储系统交互等模块,Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)API定义.
2.Spark SQL : 是Spark用来操作结构化数据程序包,通过Spark SQL,可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据,Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等.
3.Spark Streaming : 是Spark提供对实时数据进行流式计算的组件,提供了用来操作数据流的API,并且与Spark Core中的RDD API高度对应.
4.Spark MLlib : 提供常见的机器学习(ML)功能程序库,包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外支持功能.
5.集群管理器 : Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算,为了实现这样要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、ApacheMesos,以及Spark自带简易调度器,叫作独立调度器.
1.快速 : 与Hadoop MapReduce相比,Spark基于内存运算要快100倍以上,基于硬盘运算也要快10倍以上,Spark实现了高效DAG有向无环图执行引擎,可以通过基于内存来高效处理数据流,计算中间结果是存在于内存中.
2.易用 : Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使开发者可以快速构建不同应用,而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在Shell中使用Spark集群来验证解决问题方法.
3.通用性强 : Spark提供了统一解决方案,Spark可以用于批处理 / 交互式查询(SparkSQL) / 实时流处理(SparkStreaming) / 机器学习(SparkMLlib) / 图计算(GraphX),这些不同类型的处理都可以在同一个应用中无缝使用,减少了开发和维护的人力成本和部署平台的物力成本.
4.兼容性 : Spark可以非常方便地与其他的开源产品进行融合,比如Spark可以使用Hadoop YARN和ApacheMesos作为资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等,这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大处理能力.
1.Spark具有丰富组件,可适用于多种复杂应用场景,如SQL查询/机器学习/图形计算/流式计算等,同时Spark可以与Hadoop很好地集成在一起,目前已经有部分主流大数据厂商在发行版Hadoop版本中包含Spark/Cloudera/Hortonworks/MapReduce等.
2.Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆,当前百度的Spark已应用于大搜索、直达号、百度大数据等业务,阿里利用GraphX构建了大规模图计算和图挖掘系统,实现了很多生产系统的推荐算法,腾讯Spark集群达到8000台规模,是当前已知世界上最大的Spark集群.
Spark官方地址 : spark.apache.org
Spark官方下载 : spark.apache.org/downloads.html
Spark官方文档 : spark.apache.org/docs/2.1.1/
解压spark-2.1.1-bin-hadoop2.7.tgz
[root@systemhub511 software]# tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
重命名spark-2.1.1-bin-hadoop2.7
[root@systemhub511 module]# mv spark-2.1.1-bin-hadoop2.7/ spark
Local模式就是运行在单台本地计算机模式,通常就是用于在本地上练手或测试,它可以通过以下集中方式设置Master.
1.local : 所有计算都运行在一个线程当中,没有任何并行计算,通常在本机执行测试代码就用这种模式.
2.local[K] : 指定使用多少个线程来运行计算,比如local[4]就是运行4个Worker线程,通常Cpu有几个Core,就指定几个线程,最大化利用Cpu计算能力.
3.local[*] : 这种模式直接按照Cpu最多Cores来设置线程数量.
1.基本语法
bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
2.参数说明
--master: 指定Master地址,默认为Local.
--class: 应用主启动类(如org.apache.spark.examples.SparkPi).
--deploy-mode : 是否发布驱动到worker节点(cluster)或者作为一个本地客户端(client)(default: client)*
--conf : 任意Spark配置属性,格式key=value,如果值包含空格,可以加引号"key=value"
application-jar : 打包好应用jar,包含依赖,URL在集群中全局可见,比如hdfs://共享存储系统,如果是file://path,那么所有节点的path都包含同样的jar包.
application-arguments : 传给main()方法的参数.
--executor-memory 1G : 指定每个executor可用内存为1G
--total-executor-cores 2 : 指定每个executor使用cpu核数为2个
3.求π程序
3.1 求π执行语句
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
3.2 开始执行任务
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
3.3 查看执行结果 | 该算法是利用蒙特·卡罗算法求π
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 3.059446 s
Pi is roughly 3.1411463141146316
3.4 启动spark-shell
[root@systemhub511 spark]# bin/spark-shell
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = local[*], app id = local-1558677071165).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
3.5 通过WebUI查看程序运行 | http://hostname:4040
4.运行WordCount程序
4.1 在spark根目录创建wordcount目录
[root@systemhub511 spark]# mkdir -p input/wordcount
4.2 在wordcount目录创建数据文件 | vim wordcount_001.txt
[root@systemhub511 spark]# cd input/wordcount/
[root@systemhub511 wordcount]# vim wordcount_001.txt
hadoop spark hive
hadoop spark hadoop
hbase flume hive
scala java oozie
4.3 执行WordCount并查看打印结果
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (spark,2), (hive,2), (hadoop,3), (oozie,1), (flume,1), (java,1), (hbase,1))
scala>
4.4 将WordCount执行结果输出至本地文件
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("./output/wordcount/")
4.5 查看文件结果
[root@systemhub511 spark]# cd output/wordcount/
[root@systemhub511 wordcount]# ll
total 4
-rw-r--r--. 1 root root 79 May 24 14:48 part-00000
-rw-r--r--. 1 root root 0 May 24 14:48 _SUCCESS
[root@systemhub511 wordcount]# cat part-00000
(scala,1)
(spark,2)
(hive,2)
(hadoop,3)
(oozie,1)
(flume,1)
(java,1)
(hbase,1)
[root@systemhub511 wordcount]#
提交任务分析 | Spark通用运行简易流程
提交任务角色 : Driver (驱动器) & Executor (执行器)
1. Driver (驱动器)
Spark驱动器是执行开发程序中main方法进程,它负责开发人员编写用来创建SparkContext / 创建RDD,以及进行RDD转化操作和行动操作代码的执行,如果使用spark shell,那么当启动Spark shell的时候,系统后台自启一个Spark驱动器程序,就是在Spark shell中预加载一个叫作sc的SparkContext对象,如果驱动器程序终止,那么Spark应用也就结束了.
1.1 Driver主要负责 : 1.将开发者程序转为任务. -> 2.跟踪Executor运行状况. -> 3.为执行器节点调度任务. -> 4.WebUI展示应用运行状况.
2. Executor (执行器)
Spark Executor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立,Spark应用启动时,Executor节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在,如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上任务调度到其他Executor节点上继续运行.
2.2 Executor主要负责 : 1.负责运行组成Spark应用任务,并将结果返回给驱动器进程. -> 2.通过自身的块管理器(Block Manager)为开发者程序中要求缓存RDD提供内存式存储,RDD是直接缓存在Executor进程内,因此任务可以在运行时充分利用缓存数据加速运算.
| 参数列表 | 参数描述 |
textFile("input") | 读取本地文件input文件夹数据 |
flatMap(_.split(" ")) | 压平操作,按照空格分割符将一行数据映射成一个个单词 |
map((_,1)) | 对每一个元素操作,将单词映射为元组 |
reduceByKey(_+_) | 按照key将值进行聚合相加 |
collect | 将数据收集到Driver端展示 |
WordCount 程序分析
由Master+Slave构建而成的Spark集群,Spark运行在集群中.
Standalone运行模式
1.在spark根目录下进入conf目录
[root@systemhub511 spark]# cd conf/
2.修改配置文件名称 | slaves & spark-env.sh
[root@systemhub511 conf]# mv slaves.template slaves
[root@systemhub511 conf]# mv spark-env.sh.template spark-env.sh
3.修改slave文件,添加work节点 | vim slaves
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
systemhub511
systemhub611
systemhub711
4.修改spark-env.sh文件 | vim spark-env.sh
# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST=systemhub511
SPARK_MASTER_PORT=7077
5.将spark分发至其他节点集群
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
6.启动spark集群 | sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-systemhub511.out
systemhub711: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub711.out
systemhub611: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub611.out
systemhub511: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-systemhub511.out
[root@systemhub511 spark]#
7.查看集群节点状态
[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
30651 org.apache.spark.deploy.worker.Worker
30443 org.apache.spark.deploy.master.Master
813 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
10369 org.apache.spark.deploy.worker.Worker
11777 sun.tools.jps.Jps
================ root@systemhub711 All Processes ===========
8960 org.apache.spark.deploy.worker.Worker
10364 sun.tools.jps.Jps
[root@systemhub511 spark]#
8.(求π)官方案例
8.1 执行语句 | 指定 spark master
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://systemhub511:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
8.2 执行并查看结果
[root@systemhub511 spark]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://systemhub511:7077 \
> --executor-memory 1G \
> --total-executor-cores 1 \
> ./examples/jars/spark-examples_2.11-2.1.1.jar \
> 100
INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 6.478381 s
Pi is roughly 3.1405883140588315
8.3 启动sparkshell,并执行WordCount程序查看结果
参数:--master spark://systemhub511:7077 指定要连接集群master
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
Spark context Web UI available at http://systemhub511:4040
Spark context available as 'sc' (master = spark://systemhub511:7077, app id = app-20190524174512-0001).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_162)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((scala,1), (hive,2), (oozie,1), (java,1), (spark,2), (hadoop,3), (flume,1), (hbase,1))
scala>
8.4 通过WebUI查看程序运行 | http://hostname:8088
8.5 配置历史服务器(JobHistoryServer)
重命名spark-default.conf.template
[root@systemhub511 conf]# mv spark-defaults.conf.template spark-defaults.conf
8.5.1 配置spark-default.conf | vim spark-default.conf
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
8.5.2 配置spark-env.sh | vim spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
参数描述 :
spark.eventLog.dir:Application在运行过程中所有信息均记录在该属性指定的路径下.
spark.history.ui.port=18080 WEBUI访问端口号为18080
spark.history.fs.logDirectory=hdfs://systemhub511:9000/directory 配置了该属性后,在start-history-server.sh时就无需再显示指定路径,Spark History Server只展示该指定路径下信息.
spark.history.retainedApplications=30 指定保存Application历史记录个数,如果超过这个值,旧应用程序信息将被删除,这个是内存中应用数,而不是页面上显示应用数.
8.5.3 分发至其他节点集群
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
8.5.4 启动Hadoop HDFS
[root@systemhub511 hadoop]# sbin/start-dfs.sh
8.5.5 手动创建HDFS /directory目录
[root@systemhub511 spark]# hadoop fs -mkdir /directory
8.5.6 启动Spark集群
[root@systemhub511 spark]# sbin/start-all.sh
8.5.6 启动Spark历史服务
[root@systemhub511 spark]# sbin/start-history-server.sh
8.5.7 启动sparkshell
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077
sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
8.5.8 查看历史服务 | http://hostname:18080
1.停止集群所有服务
2.配置spark-env.sh | vim spark-env.sh
# SPARK_MASTER_HOST=systemhub511
# SPARK_MASTER_PORT=7077
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=systemhub511,systemhub611,systemhub711 -Dspark.deploy.zookeeper.dir=/spark"
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://systemhub511:9000/directory"
3.分发至其他节点集群
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
4.启动Hadoop HDFS
[root@systemhub511 spark]# /opt/module/hadoop/sbin/start-dfs.sh
5.启动Zookeeper集群
[root@systemhub511 spark]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub611 ~]# /opt/module/zookeeper/bin/zkServer.sh start
[root@systemhub711 ~]# /opt/module/zookeeper/bin/zkServer.sh start
6.在systemhub511启动全部服务节点
[root@systemhub511 spark]# sbin/start-all.sh
7.在systemhub611单独启动master备份节点
[root@systemhub611 ~]# /opt/module/spark/sbin/start-master.sh
8.访问SparkHA集群
[root@systemhub511 spark]# bin/spark-shell --master spark://systemhub511:7077,systemhub611:7077
http://systemhub511:8080 | systemhub511节点状态为ALIVE
http://systemhub611:8080 | systemhub611节点状态为STANDBY
9.故障转移测试
手动杀死systemhub511服务器Master进程,并查看systemhub511是否将任务转移给systemhub611备份节点作为主节点.
9.1 查看集群节点状态
[root@systemhub511 spark]# jps.sh
================ root@systemhub511 All Processes ===========
32242 org.apache.hadoop.hdfs.server.namenode.NameNode
11206 org.apache.spark.deploy.master.Master
11368 org.apache.spark.deploy.worker.Worker
9705 org.apache.zookeeper.server.quorum.QuorumPeerMain
32444 org.apache.hadoop.hdfs.server.datanode.DataNode
5228 sun.tools.jps.Jps
================ root@systemhub611 All Processes ===========
9157 org.apache.spark.deploy.master.Master
8901 org.apache.spark.deploy.worker.Worker
2822 sun.tools.jps.Jps
30214 org.apache.hadoop.hdfs.server.datanode.DataNode
7495 org.apache.zookeeper.server.quorum.QuorumPeerMain
================ root@systemhub711 All Processes ===========
5312 org.apache.spark.deploy.worker.Worker
31568 sun.tools.jps.Jps
26869 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
26647 org.apache.hadoop.hdfs.server.datanode.DataNode
4014 org.apache.zookeeper.server.quorum.QuorumPeerMain
[root@systemhub511 spark]#
9.2 Kill systemhub511 Master主节点
[root@systemhub511 spark]# kill -9 11206
9.3 systemhub511节点已宕机 | systemhub611备份节点状态已转化为ALIVE主节点
Spark客户端直接连接Yarn,不需要额外构建Spark集群.
两种模式yarn-client和yarn-cluster,主要区别在于 : Driver程序运行节点
yarn-client : Driver程序运行在客户端,适用于交互调试,立即看到app输出.
yarn-cluster : Driver程序运行在由RM(ResourceManager)启动AP(APPMaster)适用于生产环境.
1.配置spark-env.sh | vim spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
vim spark-defaults.conf
spark.master spark://systemhub511:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://systemhub511:9000/directory
spark.yarn.historyServer.address=systemhub511:18080
spark.history.ui.port=18080
vim yarn-site.xml
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true-->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
2.分发至其他节点集群
[root@systemhub511 module]# scp -r spark/ root@systemhub611:/opt/module/
[root@systemhub511 module]# scp -r spark/ root@systemhub711:/opt/module/
3.提交任务到Yarn执行
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar\
100
Spark客户端直接连接Mesos,不需要额外构建Spark集群,国内应用比较少,更多是运用yarn调度.
| 模式 | 集群数量 | 集群进程 | 所属者 |
| Loacl Mode | 1 | 无 | Spark |
| Standalone Mode | 3 | Master & Worker | Spark |
| Yarn Mode | 1 | Yarn & HDFS | Hadoop |
Spark Shell仅在测试和验证程序时使用的较多,在生产环境中通常会在IDE中编制程序,然后打成jar包提交到集群,最常用是创建Maven工程,利用Maven来管理jar包依赖.
1.JetBrains IntelliJ IDEA New Maven Project | 此过程省略
2.父工程配置信息 | pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
3.创建子模块 spark-common | 子模块配置信息 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spark_server</artifactId>
<groupId>com.geekparkhub.core.spark</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-common</artifactId>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.在spark-common子模块中创建scala源码目录 | Create WordCount.scala
package com.geekparkhub.core.spark.application.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* WordCountApplication
* <p>
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**
* Create SparkConf
* 创建 SparkConf
*/
val sparkConf = new SparkConf().setMaster(args(0)).setAppName("WordCountApplication")
/**
* Create SparkContext
* 创建 SparkContext
*/
val sc = new SparkContext()
/**
* Read file
* 读取文件
*/
val line: RDD[String] = sc.textFile(args(1))
/**
* To flatten
* 压平
*/
val word: RDD[String] = line.flatMap(_.split(" "))
/**
* Word conversion dual group
* 单词转换二元组
*/
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
/**
* Count the total number of words
* 统计单词总数
*/
val wordCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
/**
* Write out the file
* 写出文件
*/
wordCount.saveAsTextFile(args(2))
/**
* Close resource
* 关闭资源
*/
sc.stop()
}
}
5.将spark-common子模块打至成jar包上传至systemhub511服务器
6.启动HDFS | 在HDFS创建多级目录
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/input/wordcount
7.将本地文件上传至HDFS目录
hadoop fs -put /opt/module/spark/input/wordcount/wordcount_001.txt /core_flow/spark/input/wordcount
8.Yarn执行提交任务至
bin/spark-submit \
--class com.geekparkhub.core.spark.application.wordcount.WordCount \
--master yarn \
./lib_jar/WordCount.jar yarn \
/core_flow/spark/input/wordcount/wordcount_001.txt \
/core_flow/spark/output/wordcount
9.查看任务汇总结果
9.1 hadoop fs -ls -R
[root@systemhub511 spark]# hadoop fs -ls -R /core_flow/spark/output/wordcount/
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/_SUCCESS
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00000
-rw-r--r-- 3 root supergroup /core_flow/spark/output/wordcount/part-00001
[root@systemhub511 spark]#
9.2 part-00000
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00000
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(scala,1)
(hive,2)
(oozie,1)
(java,1)
[root@systemhub511 spark]#
9.3 part-00001
[root@systemhub511 spark]# hadoop fs -cat /core_flow/spark/output/wordcount/part-00001
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(spark,2)
(hadoop,3)
(flume,1)
(hbase,1)
[root@systemhub511 spark]#
RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark中最基本数据抽象,代码中是一个抽象类,它代表一个弹性/不可变/可分区/里面的元素可并行计算的集合.
* Internally, each RDD is characterized by five main properties:
*
* - 1. A list of partitions
* - 2. A function for computing each split
* - 3. A list of dependencies on other RDDs
* - 4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - 5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
1.一组分区(Partition),即数据集基本组成单位;
2.一个计算每个分区的函数;
3.RDD之间依赖关系;
4.一个Partitioner,即RDD分片函数;
5.一个列表,存储存取每个Partition的优先位置(preferred location)
RDD表示只读分区数据集,对RDD进行改动,只能通过RDD转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息,RDDs之间存在依赖,RDD执行是按照血缘关系延时计算,如果血缘关系较长,可以通过持久化RDD来切断血缘关系.
存储弹性 : 内存与磁盘的自动切换.
容错弹性 : 数据丢失可以自动恢复.
计算弹性 : 计算出错重试机制.
分片弹性 : 可根据需要重新分片.
RDD逻辑上是分区的,每个分区数据是抽象存在的,计算时会通过一个compute函数得到每个分区数据,如果RDD是通过已有文件系统构建,则compute函数是读取指定文件系统中数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD数据进行转换.
RDD是只读的,要想改变RDD中数据,只能在现有RDD基础上创建新的RDD.
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce.
RDD操作算子包括两类,一类是
transformations,它是用来将RDD进行转化,构建RDD的血缘关系,另一类是actions,它是用来触发RDD计算得到RDD相关计算结果或者将RDD保存文件系统中.
如图所示,RDDs通过操作算子进行转换,转换得到新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖.
依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多关系.
如果在应用程序中多次使用同一个RDD时,可以将该RDD缓存起来,该RDD只有在第一次计算时会根据血缘关系得到分区数据,在后续其他地方用到该RDD时,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用.
如图所示,RDD-1经过一系列转换后得到RDD-n并保存到HDFS,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0.
虽然RDD血缘关系天然地可以实现容错,当RDD某个分区数据失败或丢失,可以通过血缘关系重建,但是对于长时间迭代型应用来说随着迭代进行,RDDs之间血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能.
为此,RDD支持checkpoint将数据保存到持久化存储中,这样就可以切断之前血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs,它可以从checkpoint处拿到数据.
在Spark中,RDD被表示为对象,通过对象方法调用RDD进行转换,经过一系列的
transformations定义RDD之后,就可以调用actions触发RDD计算,action可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等).
在Spark中,只有遇到action才会执行RDD计算(即延迟计算),这样在运行时可以通过管道方式传输多个转换.
使用Spark开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD.并调用RDD上的action.Worker则执行RDD分区计算任务.
Spark创建RDD创建方式可以分为三种:
1.从集合中创建RDD
2.从外部存储创建RDD
3.从其他RDD创建
从集合中创建RDD,Spark主要提供了两种函数 : parallelize和makeRDD
1.使用parallelize()从集合创建RDD
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
2.使用makeRDD()从集合创建RDD
scala> val makerdd = sc.makeRDD(Array(511,611,711))
makerdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> makerdd.collect
res1: Array[Int] = Array(511, 611, 711)
scala>
除了在本地文件系统,还有所有Hadoop支持数据集,比如HDFS/Cassandra/HBase等.
详见 1.3.4 数据读取保存
scala> sc.textFile("/opt/module/spark/input/wordcount/wordcount_001.txt")
res2: org.apache.spark.rdd.RDD[String] = /opt/module/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[3] at textFile at <console>:25
scala>
详见1.3.2.3 RDD 转换
RDD整体分为Value类型和Key-Value类型
map(func) Method作用 : 返回一个新RDD,该RDD由每一个输入元素经过func函数转换后组成.
创建RDD
scala> val rdd = sc.parallelize(Array(511,611,711))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(511, 611, 711)
scala>
打印RDD最终结果
scala> rdd.map((_,1)).collect
res4: Array[(Int, Int)] = Array((511,1), (611,1), (711,1))
scala>
将所有元素RDD*2,最终结果
scala> rdd.map((_*2)).collect
res5: Array[Int] = Array(1022, 1222, 1422)
scala>
mapPartitions(func) Method作用 : 类似于map,但独立地在RDD每一个分片上运行,因此在类型为T的RDD上运行时,func函数类型必须是Iterator[T] => Iterator[U]
假设有N个元素,有M个分区,那么map函数将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区.
scala> rdd.mapPartitions(_.map(_*2)).collect
res11: Array[Int] = Array(1022, 1222, 1422)
scala>
mapPartitionsWithIndex(func) Method作用 : 类似于mapPartitions,但func带有一个整数参数表示分片索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
scala> rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))).collect
res13: Array[(Int, Int)] = Array((1,511), (2,611), (3,711))
scala>
flatMap(func) Method作用 : 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val text = sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
text: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> text.flatMap(_.split(" ")).collect
res16: Array[String] = Array(hadoop, spark, hive, hadoop, spark, hadoop, hbase, flume, hive, scala, java, oozie)
scala>
map()与mapPartition()区别1.map() : 每次处理一条数据
2.mapPartition() : 每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM.
3.开发指导 : 当内存空间较大的时候建议使用mapPartition(),以提高处理效率.
glom Method作用 : 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
scala> rdd.glom.collect
res17: Array[Array[Int]] = Array(Array(), Array(511), Array(611), Array(711))
scala>
groupBy(func) Method作用 : 分组按照传入函数的返回值进行分组,将相同的key对应的值放入一个迭代器.
scala> rdd.groupBy(_ % 2).collect
res18: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(611, 711, 511)))
scala>
filter(func) Method作用 : 过滤返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成.
scala> rdd.filter(_%3==0).collect
res20: Array[Int] = Array(711)
scala>
sample(withReplacement,fraction,seed) Method作用 : 以指定随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子.
scala> val rdd = sc.parallelize(1 to 100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> rdd.sample(false,0.1,3).collect
res22: Array[Int] = Array(1, 33, 37, 50, 59, 69, 75, 78, 85, 98)
scala>
distinct([numTasks])) Method作用 : 对源RDD进行去重后返回一个新的RDD,默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它.
使用distinct()对其去重操作.
scala> rdd.distinct(4).collect
res23: Array[Int] = Array(84, 100, 96, 52, 56, 4, 76, 16, 28, 80, 48, 32, 36, 24, 64, 92, 40, 72, 8, 12, 20, 60, 44, 88, 68, 13, 41, 61, 81, 21, 77, 53, 97, 25, 29, 65, 73, 57, 93, 33, 37, 45, 1, 89, 17, 69, 9, 85, 49, 5, 34, 82, 66, 22, 54, 98, 46, 30, 14, 50, 62, 42, 74, 90, 6, 70, 18, 38, 86, 58, 78, 26, 94, 10, 2, 19, 39, 15, 47, 71, 55, 95, 79, 59, 11, 35, 27, 75, 51, 23, 63, 83, 67, 3, 7, 91, 31, 87, 43, 99)
scala>
coalesce(numPartitions) Method作用 : 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率.
创建4个分区RDD,对其缩减分区.
创建RDD/查看RDD分区数/对RDD重新分区/查看新RDD分区数
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> rdd.partitions.size
res24: Int = 4
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[28] at coalesce at <console>:26
scala> coalesceRDD.partitions.size
res25: Int = 3
scala>
repartition(numPartitions) Method作用 : 根据分区数,重新通过网络随机洗牌所有数据.
创建4个分区RDD,对其重新分区.
创建RDD/查看RDD分区数/对RDD重新分区/查看新RDD分区数
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.partitions.size
res26: Int = 4
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at repartition at <console>:26
scala> rerdd.partitions.size
res27: Int = 2
scala>
coalesce与repartition区别1.
coalesce重新分区,可以选择是否进行shuffle过程,由参数shuffle: Boolean = false/true决定.2.
repartition实际上是调用coalesce,进行shuffle过程,源码演示:
def repartition(numpartitions: int)(implicit ord: ordering[t] = null): rdd[t] = withscope {
coalesce(numpartitions, shuffle = true)
}
sortBy(func,[ascending],[numTasks]) Method作用 : 使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序.
创建RDD,按照不同规则进行排序 | 按照自身大小排序 / 按照与3余数大小排序 / 按照倒序排序
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd.sortBy(x => x).collect()
res29: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x => x%3).collect()
res30: Array[Int] = Array(3, 1, 4, 2)
scala> rdd.sortBy(x => x,false).collect()
res31: Array[Int] = Array(4, 3, 2, 1)
scala>
pipe(command,[envVars]) Method作用 : 管道针对每个分区,都执行一个shell脚本,返回输出RDD.
创建脚本,使用管道将脚本作用于RDD上
[root@systemhub511 ~]# vim /opt/module/spark/input/pipe.sh
[root@systemhub511 ~]# chmod 777 /opt/module/spark/input/pipe.sh
vim pipe.sh
#!/bin/
shecho"Start"
while read LINE;do
echo ">>>" ${LINE}
done
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
scala>
union(otherDataset) Method作用 : 对源RDD和参数RDD求并集后返回一个新RDD | 创建两个RDD进行并集计算
scala> var rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.union(rdd2).collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
scala>
subtract(otherDataset) Method作用 : 计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来
scala> rdd1.subtract(rdd2).collect
res0: Array[Int] = Array(2, 4, 1, 3)
scala>
intersection(otherDataset) Method作用 : 对源RDD和参数RDD求交集后,返回一个新的RDD
scala> rdd1.intersection(rdd2).collect
res1: Array[Int] = Array(5)
scala>
cartesian(otherDataset) Method作用 : 笛卡尔积 (尽量避免使用)
scala> rdd1.cartesian(rdd2).collect
res2: Array[(Int, Int)] = Array((1,5), (1,6), (1,7), (2,5), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,5), (3,6), (3,7), (4,5), (4,6), (4,7), (5,5), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
scala>
zip(otherDataset) Method作用 : 将两个RDD组合成Key/Value形式RDD,默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常.
scala> rdd1.zip(rdd2).collect
res4: Array[(Int, Int)] = Array((1,6), (2,7), (3,8), (4,9), (5,10))
scala>
partitionBy Method作用 : 对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程.
scala> val rdd1 = sc.parallelize(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),4)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.mapPartitionsWithIndex((i,t)=>t.map((i,_))).collect
res3: Array[(Int, (Int, String))] = Array((0,(1,A)), (1,(2,B)), (2,(3,C)), (3,(4,D)))
scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
res5: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[3] at partitionBy at <console>:27
scala> res5.partitions.size
res6: Int = 2
scala>
reduceByKey(func,[numTasks]) Method在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定reduce函数,将相同key值聚合到一起,reduce任务个数可以通过第二个可选参数来设置.
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd.reduceByKey((x,y)=>x+y).collect
res7: Array[(String, Int)] = Array((female,6), (male,7))
scala>
groupByKey Method作用 : groupByKey也是对每个key进行操作,但只生成一个seq.
scala> rdd.groupByKey(2).collect
res8: Array[(String, Iterable[Int])] = Array((female,CompactBuffer(5, 1)), (male,CompactBuffer(5, 2)))
scala>
reduceByKey与groupByKey 区别1.reduceByKey : 按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]
2.groupByKey : 按照key进行分组,直接进行shuffle
3.开发指导 : reduceByKey比groupByKey,建议使用reduceByKey,但是需要注意是否会影响业务逻辑.
aggregateByKey Method参数 :
(zeroValue:U,[partitioner:Partitioner])(seqOp: (U, V) => U,combOp: (U, U) => U)1.作用 : 在kv对的RDD中,按key将value进行分组合并,合并时将每个value和初始值作为seq函数参数进行计算,返回结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出.
2.参数描述 :
zeroValue: 给每一个分区中的每一个key一个初始值.
seqOp: 函数用于在每一个分区中用初始值逐步迭代value
combOp: 函数用于合并每个分区中的结果
创建一个pairRDD,取出每个分区相同key对应值的最大值然后相加.
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.aggregateByKey(0)(math.max(_,_),_+_).collect
res9: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
scala>
scala> rdd.aggregateByKey(0)(_+_,_+_).collect
res10: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala> rdd.reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
foldByKey Method参数 : (zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
作用 : aggregateByKey的简化操作,seqop和combop相同
scala> rdd.foldByKey(0)(_+_).collect
res12: Array[(String, Int)] = Array((b,3), (a,5), (c,18))
scala>
combineByKey[C] Method参数 : (createCombiner:V=>C,mergeValue:(C,V)=>C,mergeCombiners:(C,C)=>C)
作用 : 针对相同K,将V合并成一个集合.
参数描述 :
1.
createCombiner:combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()函数来创建那个键对应的累加器初始值.2.
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并.3.
mergeCombiners: 由于每个分区都是独立处理,因此对于同一个键可以有多个累加器,如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并.
scala> rdd.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).collect
res15: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3)))
scala>
sortByKey([ascending],[numTasks]) Method作用 : 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
scala> rdd.sortByKey().collect
res17: Array[(String, Int)] = Array((a,3), (a,2), (b,3), (c,6), (c,8), (c,4))
scala> rdd.sortByKey(false).collect
res19: Array[(String, Int)] = Array((c,4), (c,6), (c,8), (b,3), (a,3), (a,2))
scala>
mapValues Method针对于(K,V)形式的类型只对V进行操作
scala> rdd.mapValues(_*2).collect
res20: Array[(String, Int)] = Array((a,6), (a,4), (c,8), (b,6), (c,12), (c,16))
scala>
join(otherDataset,[numTasks]) Method作用 : 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.join(rdd1).collect
res21: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
scala> rdd.leftOuterJoin(rdd1).collect
res22: Array[(Int, (String, Option[Int]))] = Array((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,Some(6))))
scala> rdd.rightOuterJoin(rdd1).collect
res23: Array[(Int, (Option[String], Int))] = Array((1,(Some(a),4)), (2,(Some(b),5)), (3,(Some(c),6)))
scala>
cogroup(otherDataset,[numTasks]) Method作用 : 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
scala> rdd.cogroup(rdd1).collect
res24: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
scala>
reduce(func) Method作用 : 通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> rdd.reduce(_+_)
res25: Int = 55
scala>
collect() Method作用 : 在驱动程序中,以数组的形式返回数据集的所有元素
scala> rdd.collect
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala>
count() Method作用 : 返回RDD中元素的个数
scala> rdd.count
res27: Long = 10
scala>
first() Method作用 : 返回RDD中第一个元素
scala> rdd.first
res28: Int = 1
scala>
take(n) Method作用 : 返回一个由RDD前n个元素组成的数组
scala> rdd.take(2)
res30: Array[Int] = Array(1, 2)
scala>
takeOrdered(n) Method作用 : 返回该RDD排序后的前n个元素组成的数组
scala> rdd.takeOrdered(3)
res31: Array[Int] = Array(1, 2, 3)
scala>
aggregate Method参数 :
(zeroValue: U)(seqOp: (U, T) ⇒U, combOp: (U, U) ⇒U)作用 : aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作,这个函数最终返回的类型不需要和RDD中元素类型一致.
scala> rdd.aggregate(0)(_+_,_+_)
res32: Int = 55
scala>
fold(num)(func) Method作用 : 折叠操作,aggregate的简化操作,seqop和combop一样
scala> rdd.fold(0)(_+_)
res34: Int = 55
scala>
saveAsTextFile(path) Method作用 : 将数据集元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本.
saveAsSequenceFile(path) Method作用 : 将数据集中的元素以Hadoop sequencefile格式保存到指定目录下,可以使HDFS或者其他Hadoop支持的文件系统.
saveAsObjectFile(path) Method作用 : 用于将RDD中元素序列化成对象,存储到文件中.
countByKey() Method作用 : 针对(K,V)类型RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数.
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[35] at parallelize at <console>:24
scala> rdd.countByKey
res35: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
scala>
foreach(func) Method作用 : 在数据集的每一个元素上,运行函数func进行更新
scala> rdd.foreach(print)
在实际开发中往往需要开发者定义一些对于RDD操作,那么此时需要主要的是,初始化工作是在Driver端进行,而实际运行程序是在Executor端进行,这就涉及到了跨进程通信,跨进程通信是需要序列化操作.
在这个方法中所调用的方法
isMatch()是定义在Search这个类中,实际上调用的是this.isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端.
在这个方法中所调用的方法
query是定义在Search这个类中的字段,实际上调用的是this.query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端.
Create Search.scala
package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* Search
* <p>
*/
class Search(query: String) extends Serializable {
// 过滤出包含字符串数据
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 过滤出包含字符串RDD
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 过滤出包含字符串RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
Create TransFormAction.scala
package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* TransFormAction
* <p>
*/
object TransFormAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TransFormAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val word: RDD[String] = sc.parallelize(Array("abc", "dcd"))
// 创建Search对象
val search = new Search("a")
// 调用方法
val searched: RDD[String] = search.getMatch1(word)
// 循环输出
searched.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区,RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区.
创建RDD依赖关系
scala> sc.textFile("/core_flow/spark/input/wordcount/wordcount_001.txt")
res0: org.apache.spark.rdd.RDD[String] = /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25
scala> res0.flatMap(_.split(" "))
res2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27
scala> res2.map((_,1))
res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29
scala> res3.reduceByKey(_+_)
res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31
scala>
分别查看四个RDD依赖关系
res0.toDebugString
scala> res0.toDebugString
res5: String =
(2) /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
res2.toDebugString
scala> res2.toDebugString
res6: String =
(2) MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
res3.toDebugString
scala> res3.toDebugString
res7: String =
(2) MapPartitionsRDD[3] at map at <console>:29 []
| MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
res4.toDebugString
scala> res4.toDebugString
res8: String =
(2) ShuffledRDD[4] at reduceByKey at <console>:31 []
+-(2) MapPartitionsRDD[3] at map at <console>:29 []
| MapPartitionsRDD[2] at flatMap at <console>:27 []
| /core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[1] at textFile at <console>:25 []
| /core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[0] at textFile at <console>:25 []
scala>
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用.
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle过程.
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage.
对于窄依赖,partition的转换处理在Stage中完成计算,对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage依据.
RDD任务切分中间分为 : Application / Job / Stage / Task
Application : 初始化一个SparkContext即生成一个Application.
Job : 一个Action算子就会生成一个Job.
Stage : 根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage.
Task : Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task.
Application->Job->Stage->Task 每一层都是1对n的关系
RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下persist()会把数据以序列化形式缓存在JVM 的堆空间中.
但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用
缓存有可能丢失或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行,通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立,因此只需要计算丢失的部分即可,并不需要重算全部Partition.
Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销,检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能.
为当前RDD设置检查点,该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的,在checkpoint的过程中,该RDD所有依赖于父RDD中的信息将全部被移除,对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发.
设置检查点
scala> sc.setCheckpointDir("hdfs://systemhub511:9000/core_flow/spark/checkpoint")
创建RDD
scala> val rdd = sc.parallelize(Array("systemhub511"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
将RDD转换为携带当前时间戳并做checkpoint
scala> val check = rdd.map(_+System.currentTimeMillis)
check: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at <console>:26
scala>
多次打印结果
scala> check.collect
res10: Array[String] = Array(systemhub5111559138263898)
scala> check.collect
res11: Array[String] = Array(systemhub5111559138266443)
scala> check.collect
res12: Array[String] = Array(systemhub5111559138267862)
scala>
Spark目前支持Hash分区和Range分区,开发者也可以自定义分区,Hash分区为当前默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数.
查看RDD分区器
scala> rdd.partitioner
res14: Option[org.apache.spark.Partitioner] = None
HashPartitioner分区的原理 : 对于给定的key,计算其hashCode,并除以分区个数取余,如果余数小于0,则用余数+分区的个数,否则加0,最后返回的值就是这个key所属的分区ID.
Hash分区实操
scala> val nopar = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
nopar: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:25
scala> nopar.mapPartitionsWithIndex((index,iter)=>{Iterator(index.toString+":"+iter.mkString("|"))}).collect
res15: Array[String] = Array(0:, 1:(1,3), 2:(1,2), 3:(2,4), 4:, 5:(2,3), 6:(3,6), 7:(3,8))
scala> val hashpar = nopar.partitionBy(new org.apache.spark.HashPartitioner(7))
hashpar: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[9] at partitionBy at <console>:27
scala> hashpar.count
res20: Long = 6
scala> hashpar.partitioner
res21: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@7)
scala> hashpar.mapPartitions(iter => Iterator(iter.length)).collect()
res22: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
scala>
HashPartitioner分区弊端 : 可能导致每个分区中数据量不均匀,极端情况下会导致某些分区拥有RDD全部数据.
RangePartitioner作用 : 将一定范围内数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序,一个分区中元素肯定都是比另一个分区内元素小或者大,但是分区内元素是不能保证顺序,简单的说就是将一定范围内的数映射到某一个分区内.
实现过程 :
1.先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区最大key值,形成一个Array[KEY]类型的数组变量rangeBounds.
2.判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中分区id下标,该分区器要求RDD中KEY类型必须是可排序.
要实现自定义分区器,需要继承org.apache.spark.Partitioner类并实现下面三个方法
1.numPartitions: Int : 返回创建出来的分区数
2.getPartition(key: Any): Int : 返回给定键的分区编号(0到numPartitions-1)
3.equals() : Java 判断相等性的标准方法,这个方法的实现非常重要,Spark需要用这个方法来检查分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同.
4.定义自定义分区类 | Create CustomerPartitioner.scala
package com.geekparkhub.core.spark.application.partitioner
import org.apache.spark.Partitioner
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomerPartitioner
* <p>
*/
class CustomerPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
0
}
}
Create PartitionerAction.scala
package com.geekparkhub.core.spark.application.partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* PartitionerAction
* <p>
*/
object PartitionerAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TransFormAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val word: RDD[String] = sc.parallelize(Array("abc", "dcd"))
// 将元素转换为元祖
val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
// 自定义分区
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(new CustomerPartitioner(2))
// 查看分区后分区结果
val indexAndData: RDD[(Int, (String, Int))] = partitioned.mapPartitionsWithIndex((i,t)=>t.map((i,_)))
// 打印数据
indexAndData.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
Log Println
(0,(abc,1))
(0,(dcd,1))
Spark数据读取及数据保存可以从两个维度来作区分 : 文件格式以及文件系统
文件格式分为 : Text文件 / Json文件 / Csv文件 / Sequence文件以及Object文件
文件系统分为 : 本地文件系统 / HDFS / HBASE以及数据库
1.数据读取 : textFile(String)
scala> sc.textFile("hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt")
res23: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[12] at textFile at <console>:26
scala> res23.toDebugString
res25: String =
(2) hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[12] at textFile at <console>:26 []
| hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt HadoopRDD[11] at textFile at <console>:26 []
scala>
2.数据保存 : saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/core_flow/spark/output/wordcount/")
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析.
使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件方式,所以应用中多是采用SparkSQL处理JSON文件.
1.导入解析json所需包名
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON
scala>
2.在HDFS创建存放JSON目录
[root@systemhub511 ~]# hadoop fs -mkdir -p /core_flow/spark/json/001
3.上传json文件到HDFS
[root@systemhub511 ~]# hadoop fs -put /opt/module/spark/examples/src/main/resources/people.json /core_flow/spark/json/001/
4.读取文件
scala> val json = sc.textFile("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
json: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/json/001/people.json MapPartitionsRDD[14] at textFile at <console>:26
scala>
5.解析json数据
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[15] at map at <console>:28
scala>
6.打印解析结果
scala> result.collect
res26: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))
scala>
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计一种平面文件(FlatFile).
Spark有专门用来读取SequenceFile接口,在SparkContext中,可以调用sequenceFile<a href="path">keyClass, valueClass</a> | SequenceFile文件只针对PairRDD
1.创建RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:26
scala>
2.将RDD保存为Sequence文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
3.查看该文件
[root@systemhub511 ~]# cd /opt/module/spark/seqFile/
[root@systemhub511 seqFile]# ll -a
总用量 28
drwxr-xr-x. 2 root root 4096 5月 29 23:57 .
drwxr-xr-x. 21 geekdeveloper geekdeveloper 4096 5月 30 00:05 ..
-rw-r--r--. 1 root root 92 5月 29 23:57 part-00000
-rw-r--r--. 1 root root 12 5月 29 23:57 .part-00000.crc
-rw-r--r--. 1 root root 108 5月 29 23:57 part-00003
-rw-r--r--. 1 root root 12 5月 29 23:57 .part-00003.crc
-rw-r--r--. 1 root root 0 5月 29 23:57 _SUCCESS
-rw-r--r--. 1 root root 8 5月 29 23:57 ._SUCCESS.crc
[root@systemhub511 seqFile]# cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritabler[-o���]h�~u���
[root@systemhub511 seqFile]#
4.读取Sequence文件
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at sequenceFile at <console>:26
scala>
5.打印读取后的Sequence文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
对象文件是将对象序列化后保存文件,采用Java序列化机制,可以通过objectFile<a href="path">k,v</a>函数接收一个路径,读取对象文件,返回对应RDD,也可以通过调用saveAsObjectFile()实现对对象文件输出,因为是序列化所以要指定类型.
1.创建RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala>
2.将RDD保存为Object文件
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
3.查看该文件
[root@systemhub511 ~]# cd /opt/module/spark/objectFile/
[root@systemhub511 objectFile]# ll
总用量 8
-rw-r--r--. 1 root root 138 5月 30 00:05 part-00000
-rw-r--r--. 1 root root 138 5月 30 00:05 part-00003
-rw-r--r--. 1 root root 0 5月 30 00:05 _SUCCESS
[root@systemhub511 objectFile]# cat part-00000
SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable� �L�h�l:T���#��ur[IM�`&v겥xp
[root@systemhub511 objectFile]#
4.读取Object文件
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at objectFile at <console>:26
scala>
5.打印读取后的Sequence文件
objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)
Spark整个生态系统与Hadoop是完全兼容,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.
另外由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有版本,也提供了两套创建操作接口.
对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数 :1.
输入格式(InputFormat): 制定数据输入类型,如TextInputFormat等,新旧两个版本所引用版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2.键类型 : 指定[K,V]键值对中K类型
3.值类型: 指定[K,V]键值对中V类型
4.分区值 : 指定由外部存储生成RDD的partition数量最小值,如果没有指定系统会使用默认值defaultMinSplits.其他创建操作API接口都是为了方便最终Spark程序开发者而设置的,是这两个接口高效实现版本,例如对于textFile而言,只有path这个指定文件路径参数,其他参数在系统内部指定了默认值.
1.在Hadoop中以压缩形式存储数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型数据不知道怎么读取的时候,上网查找一个使用map-reduce时候是怎么读取这种这种数据,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类即可.
支持通过JavaJDBC访问关系型数据库,需要通过JdbcRDD进行
0.添加mysql依赖
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
</dependencies>
1.Mysql读取 | Create JDBCConnection.scala
package com.geekparkhub.core.spark.application.dataconnections
import java.sql.DriverManager
import org.apache.spark.deploy.worker.DriverWrapper
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* JDBCConnection
* <p>
*/
object JDBCConnection {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JDBCConnection")
// 创建SC
val sc = new SparkContext(sparkConf)
// 定义JDBC连接属性信息
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://systemhub711:3306/company"
val userName = "root"
val passWd = "ax04854"
// 创建JDBC RDD
val JdbcRDD = new JdbcRDD[(Int, String)](sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
}, "select id,name from staff where ? <= id and id <= ?",
1,
10,
1,
x => {
(x.getInt(1), x.getString(2))
}
)
// 打印JdbcRDD结果
JdbcRDD.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
2.Mysql写入 | Create JBDCinsertData.scala
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* JBDCinsertData
* <p>
*/
object JBDCinsertData {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JBDCRead")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建数据
val data = sc.parallelize(List("Female", "Male", "Female"))
// 调用添加数据方法
data.foreachPartition(insertData)
}
// 添加数据方法
def insertData(iterator: Iterator[String]): Unit = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://systemhub711:3306/company", "root", "000000")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into staff(name) values(?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
}
由于org.apache.hadoop.hbase.mapreduce.TableInputFormat类的实现,Spark可以通过Hadoop输入格式访问HBase,这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.Result.
0.添加HBASE依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
1.HBase读取数据
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* HbaseConnection
* <p>
*/
object HbaseConnection {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseConnection")
// 创建SC
val sc = new SparkContext(sparkConf)
//构建HBase配置信息
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "systemhub511,systemhub611,systemhub711")
conf.set(TableInputFormat.INPUT_TABLE, "test")
// 读取HBASE数据
val hbaseRDD = new NewHadoopRDD(sc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result], conf)
// 获取RowKey
val value: RDD[String] = hbaseRDD.map(x => Bytes.toString(x._2.getRow))
// 输出数据
value.collect().foreach(println)
// 关闭资源
sc.stop()
}
}
2.HBase写入数据
package com.geekparkhub.core.spark.application.dataconnections
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* HbaseWrite
* <p>
*/
object HbaseWrite {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseWrite")
// 创建SC
val sc = new SparkContext(sparkConf)
// 创建RDD
val initialRDD: RDD[(Int, String, Int)] = sc.parallelize(List((1, "apple", 11), (2, "banana", 12), (3, "pear", 13)))
// 创建JobConf
val conf = new JobConf()
conf.set("hbase.zookeeper.quorum", "systemhub511,systemhub611,systemhub711")
conf.setOutputFormat(classOf[TableOutputFormat[ImmutableBytesWritable]])
conf.set(TableOutputFormat.OUTPUT_TABLE, "test")
// 定义 Hbase 添加数据方法
def convert(triple: (Int, String, Int)): (ImmutableBytesWritable, Put) = {
val put = new Put(Bytes.toBytes(triple._1))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))(new ImmutableBytesWritable, put)
}
// 转换RDD
val writRDD: RDD[(ImmutableBytesWritable, Put)] = initialRDD.map(convert)
// 写入HBASE
writRDD.saveAsHadoopDataset(conf)
// 关闭资源
sc.stop()
}
}
累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用
map()函数或者用filter()传条件时,可以使用驱动器程序中定义变量,但是集群中运行每个任务都会得到这些变量的一份新副本,更新这些副本的值也不会影响驱动器中的对应变量,如果想实现所有分片处理时更新共享变量的功能,那么累加器可以实现想要的效果.
通过在驱动器中调用S
parkContext.accumulator(initialValue)方法,创建出存有初始值的累加器,返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型,Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值,驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值.工作节点上任务不能访问累加器值,从这些任务的角度来看,累加器是一个只写变量.
对于要在行动操作中使用累加器,Spark只会把每个任务对各累加器的修改应用一次,因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动操作中,转化操作中累加器可能会发生不止一次更新.
package com.geekparkhub.core.spark.application.methods
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AccuAction
* <p>
*/
object AccuAction {
def main(args: Array[String]): Unit = {
// 创建SpakConf
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("AccuAction")
// 创建SC
val sc = new SparkContext(sparkConf)
// 累加器
val sum: LongAccumulator = sc.longAccumulator("sum")
// 创建RDD
val value: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
val word: RDD[(Int, Int)] = value.map(x => {
// 添加累加
sum.add(x)
(x, 1)
})
word.collect().foreach(println)
println(sum.value)
// 关闭资源
sc.stop()
}
}
自定义累加器类型功能在1.X版本中就已经提供,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大改进,而且官方还提供了一个新抽象类 :
AccumulatorV2来提供更加友好自定义类型累加器的实现方式,实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,
package com.geekparkhub.core.spark.application.methods
import org.apache.spark.util.AccumulatorV2
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AccumulatorAction
* <p>
*/
class AccumulatorAction extends AccumulatorV2[Int,Int]{
var sum = 0
// 判断是否为空
override def isZero: Boolean = sum == 0
// 复制方法
override def copy(): AccumulatorV2[Int, Int] = {
val accumulatorAction = new AccumulatorAction
accumulatorAction.sum = this.sum
accumulatorAction
}
// 重置方法
override def reset(): Unit = 0
// 累加方法
override def add(v: Int): Unit = sum += v
// 合并方法
override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value
// 返回值
override def value: Int = sum
}
广播变量用来高效分发较大对象,向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用.
比如,如果应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手,在多个并行操作中使用同一个变量,但是Spark会为每个任务分别发送.
使用广播变量过程 :
1.通过对一个类型T的对象调用SparkContext.broadcast创建出Broadcast[T]对象,任何可序列化类型都可以这么实现.
2.通过value属性访问该对象值(在Java中为value()方法).
3.变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点).
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
scala>
Spark SQL是Spark用来处理结构化数据模块,它提供了2个编程抽象 :
DataFrame和DataSet,并且作为分布式SQL查询引擎作用.已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc程序复杂性,由于MapReduce计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快.
易整合
统一数据访问方式
兼容Hive
标准数据连接
与RDD类似,DataFrame也是一个分布式数据容器,然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema,同时与Hive类似,DataFrame也支持嵌套数据类型(struct / array / map).
从API易用性角度上看,DataFrame API提供是一套高层的关系操作,比函数式RDD API要更加友好,门槛更低.
上图直观地体现了DataFrame和RDD区别,左侧RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类内部结构,而右侧DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列名称和类型各是什么,DataFrame是为数据提供了Schema视图,可以把它当做数据库中一张数据表.
DataFrame也是懒执行,性能上比RDD要高要原因 : 优化执行计划,查询计划通过Spark
catalyst optimiser进行优化.
为了说明查询优化,上图展示的人口数据分析示例,图中构造了两个DataFrame,将它们join之后又做了一次filter操作,如果原封不动地执行这个执行计划,最终的执行效率是不高的,因为join是一个代价较大操作,也可能会产生一个较大数据集,如果能将filter下推到join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间.
而Spark SQL的查询优化器正是这样做的,简而言之逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程.
1.DataSet是DataframeAPI扩展,是SparkSQL最新数据抽象.
2.友好API风格,既具有类型安全检查也具有Dataframe的查询优化特性.
3.Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率.
4.样例类被用来在Dataset中定义数据结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称.
5.Dataframe是Dataset的特列,
DataFrame=Dataset[Row],所以可以通过as方法将Dataframe转换为Dataset,Row是一个类型,跟Car / Person这些类型一样,所有表结构信息都用Row来表示.6.DataSet是强类型,比如可以有
Dataset[Car],Dataset[Person].7.DataFrame只是知道字段,但是不知道字段类型,所以在执行这些操作时是没办法在编译的时候检查是否类型失败,比如可以对一个String进行减法操作,在执行时才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查,就跟JSON对象和类对象之间的类比.
在老版本中,SparkSQL提供两种SQL查询起始点 :
SQLContext : 用于Spark提供SQL查询.
HiveContext : 用于连接Hive查询.SparkSession是Spark最新SQL查询起始点,实质上是SQLContext和HiveContext组合,所以在SQLContext和HiveContext上可用API在SparkSession上同样是可以使用,SparkSession内部封装了
sparkContext,所以计算实际上是由sparkContext完成.
在SparkSQL中
SparkSession是创建DataFrame和执行SQL入口.
创建DataFrame有三种方式 :
1.通过Spark数据源进行创建.
2.从已存在的RDD进行转换.
3.从Hive Table进行查询返回.
1.从Spark数据源进行创建
查看Spark数据源进行创建文件格式
scala> spark.read.
csv jdbc load options parquet table textFile
format json option orc schema text
scala> spark.read.
2.读取json文件创建DataFrame展示结果
scala> val jsonflow = spark.read.json("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
jsonflow: org.apache.spark.sql.DataFrame = [age: bigint, name: string]=
scala> jsonflow.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
3.RDD进行转换 | 轻轻1.4.2.5
4.Hive Table进行查询返回 |
对DataFrame创建临时表
临时表是Session范围内,Session退出后,表就会失效,如果想应用范围内有效,可以使用全局表,注意使用全局表时需要全路径访问,如 : global_temp.people
scala> jsonflow.createTempView("people")
通过SQL语句实现查询全表结果展示
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
对于DataFrame创建全局表
scala> jsonflow.createGlobalTempView("peoples")
通过SQL语句实现查询全表结果展示
scala> spark.sql("SELECT * FROM global_temp.peoples").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> spark.newSession().sql("SELECT * FROM global_temp.peoples").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
查看DataFrame Schema信息
scala> jsonflow.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala>
只查看name列数据
scala> jsonflow.select("name").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala>
查看name列数据以及age+1数据
scala> jsonflow.select($"name",$"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
scala>
查看age大于21数据
scala> jsonflow.filter($"age" > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala>
按照age分组,查看数据条数
scala> jsonflow.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
scala>
如果需要RDD与DF或者DS之间操作,需要引入
import spark.implicits._
spark并不是包名,而是sparkSession对象名称.
导入隐式转换并创建RDD
scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = hdfs://systemhub511:9000/core_flow/spark/input/wordcount/wordcount_001.txt MapPartitionsRDD[30] at textFile at <console>:27
scala>
1.通过手动转换
scala> peopleRDD.map{x=>{val split = x.split(",");(split(0),split(1).trim)}}.toDF("name","age")
res11: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala>
2.通过反射转换 (需要用到样例类)
创建样例类,根据样例类将RDD转换为DataFrame
scala> case class People(name:String, age:Int)
defined class People
scala> peopleRDD.map{x=>{val split = x.split(",");People(split(0),split(1).trim.toInt)}}.toDF
res17: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> res17.toDF
res18: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala>
3.通过编程方式转换
导入所需类型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
创建Schema
scala> val structType: StructType = StructType(StructField("name",StringType) :: StructField("age",IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
scala>
导入所需类型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala>
根据指定类型创建二元组RDD
scala> val data = peopleRDD.map{x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
scala>
根据数据及指定schema创建DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
Create SqlAction.scala
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SqlAction
* <p>
*/
object SqlAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("SqlAction").getOrCreate()
// 创建SC
val sc: SparkContext = sparkSession.sparkContext
// 创建 RDD
val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
// 将Int类型RDD转换为Row类型RDD
val rowRDD: RDD[Row] = rdd.map(x => {Row(x)})
// 数据输出
rowRDD.collect().foreach(println)
// 创建元数据信息
val structType = new StructType
val structTypes: StructType = structType.add(StructField("id", IntegerType))
val dataFrame: DataFrame = sparkSession.createDataFrame(rowRDD,structTypes)
// 导入隐式转换
import sparkSession.implicits._
// DSL风格 数据查询
dataFrame.select("id").show()
// 关闭资源
sparkSession.stop()
}
}
直接调用rdd即可.
创建DataFrame
scala> val df = spark.read.json("/core_flow/spark/json/001/people.json")df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala>
将DataFrame转换为RDD
scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at rdd at <console>:29
scala>
打印RDD
scala> dfToRDD.collect
res0: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
scala>
Dataset是具有强类型的数据集合,需要提供对应类型信息.
创建样例类
scala> case class Person(name: String, age: Long)
defined class Person
scala>
创建DataSet
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
查看结果
scala> caseClassDS.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala>
SparkSQL能够自动将包含有case类RDD转换成DataFrame,case类定义了table结构,case类属性通过反射变成了表列名,Case类可以包含诸如Seqs或者Array等复杂结构.
创建RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[8] at textFile at <console>:28
scala>
创建样例类
scala> case class Person(name: String, age: Long)
defined class Person
scala>
将RDD转化为DataSet
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS
res2: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
调用rdd方法即可.
创建一个DataSet
scala> val DS= Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
将DataSet转换为RDD
scala> DS.rdd
res3: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[12] at rdd at <console>:28
scala> res3.collect
res4: Array[Person] = Array(Person(Andy,32))
scala>
此方法就是在给出每一列类型后,使用as方法转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便,在使用一些特殊的操作时,一定要加上import spark.implicits._不然toDF、toDS无法使用.
创建DateFrame
scala> val df = spark.read.json("./examples/src/main/resources/people.json")
创建样例类
scala> case class Person(name: String, age: Long)
defined class Person
scala>
将DateFrame转化为DataSet
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala>
创建样例类
scala> case class Person(name: String, age: Long)
defined class Person
scala>
创建DataSet
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala>
将DataSet转化为DataFrame并展示结果
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
scala>
在SparkSQL中Spark为提供了两个新抽象,分别是
DataFrame和DataSet.
他们和RDD有什么区别? 首先从版本的产生上来看 :
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同样数据都给到这三个数据结构,他们分别计算之后,都会给出相同结果,不同是执行效率和执行方式.
在后期Spark版本中,DataSet会逐步取代RDD和DataFrame成为唯一的API接口.
1.RDD / DataFrame / Dataset全都是spark平台下分布式弹性数据集,为处理超大型数据提供便利.
2.三者都有惰性机制,在进行创建 / 转换,如map方法时不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算.
3.三者都会根据spark内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出.
4.三者都有partition概念.
5.三者有许多共同函数,如filter,排序等.
6.在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持importspark.implicits._
7.DataFrame和Dataset均可使用模式匹配获取各个字段值和类型.
DataFrame :
DF.map{
caseRow(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case_=> ""
}
Dataset :
// 定义字段名和类型
caseclassColtest(col1:String,col2:Int)extendsSerializable
DS.map{
caseColtest(col1:String,col2:Int)=>
println(col1);println(col2)
col1
case_=> ""
}
1. RDD :
RDD一般和spark mlib同时使用
RDD不支持sparksql操作
2. DataFrame
与RDD和Dataset不同,DataFrame每一行类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段值.
DF.foreach{
line=>
valcol1=line.getAs[String]("col1")
valcol2=line.getAs[String]("col2")
}
DataFrame与Dataset一般不与spark mlib同时使用
DataFrame与Dataset均支持sparksql操作,比如select,groupby,还能注册临时表/视窗,进行sql语句操作.
DataFrame与Dataset支持一些特别方便保存方式,比如保存成csv,可以带上表头,这样每一列字段名一目了然.
3.Dataset`
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行数据类型不同.
DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到getAS方法或者共性中的第七条提到的模式匹配拿出特定字段,而Dataset中,每一行是什么类型是不一定,在自定义了case class之后可以很自由获得每一行信息.
Dataset在需要访问列中某个字段时是非常方便,然而如果要写一些适配性很强函数时,如果使用Dataset,行类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好解决问题.
SQL & DSL风格数据查询
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SqlAction
* <p>
*/
object SqlAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("SqlAction").getOrCreate()
// 导入隐式转换
import sparkSession.implicits._
// 创建DF
val df: DataFrame = sparkSession.read.json("/Volumes/GEEK-SYSTEM/Technical_Framework/spark/projects/spark_server/spark-sql/data/people.json")
// SQL风格 数据查询 | 创建临时表
df.createTempView("PEOPLE")
sparkSession.sql("SELECT * FROM PEOPLE").show()
// DSL风格 数据查询
df.select("name").show()
// 关闭资源
sparkSession.stop()
}
}
在Shell窗口中可通过spark.udf功能自定义函数.
创建DF
scala> val df = spark.read.json("hdfs://systemhub511:9000/core_flow/spark/json/001/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala>
注册UDF
scala> spark.udf.register("addName",(x:String) => "Name:" + x)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala>
创建数据表
scala> df.createOrReplaceTempView("people")
查询数据表
scala> spark.sql("Select addName(name),age from people").show()
+-----------------+----+
|UDF:addName(name)| age|
+-----------------+----+
| Name:Michael|null|
| Name:Andy| 30|
| Name:Justin| 19|
+-----------------+----+
scala>
强类型Dataset和弱类型DataFrame都提供了相关聚合函数,如count(),countDistinct(),avg(),max(),min(),除此之外还可以设定自定义聚合函数.
弱类型自定义聚合函数 : 通过继承UserDefinedAggregateFunction来实现自定义聚合函数.
下面展示 求平均工资自定义聚合函数
Create AvgAction.scala
package com.geekparkhub.core.spark.application.aggregation
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* AvgAction
* <p>
*/
object AvgAction extends UserDefinedAggregateFunction {
// 定义输入数据类型
override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)
// 缓存中间值类型
override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
// 定义输出数据类型
override def dataType: DataType = DoubleType
// 函数稳定参数
override def deterministic: Boolean = true
// 初始化缓存数据
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// 在执行器之内更新
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1L
}
// 在执行器之外合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 执行数据计算
override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)
}
Create UdafAction.scala
package com.geekparkhub.core.spark.application.aggregation
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* UdafAction
* <p>
*/
object UdafAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder().master("local[*]").appName("UdafAction").getOrCreate()
// 创建DF
val df: DataFrame = sparkSession.read.json("/Volumes/GEEK-SYSTEM/Technical_Framework/spark/projects/spark_server/spark-sql/data/people.json")
// SQL风格 数据查询 | 创建临时表
df.createTempView("PEOPLE")
// 注册自定义函数
sparkSession.udf.register("AvgAction", AvgAction)
// 使用自定义函数
sparkSession.sql("SELECT AvgAction(age) FROM PEOPLE").show()
// 关闭资源
sparkSession.stop()
}
}
强类型自定义聚合函数 : 通过继承Aggregator来实现强类型自定义聚合函数,同样是求平均工资.
Spark SQL DataFrame接口支持多种数据源操作,一个DataFrame可以进行RDDs方式操作,也可以被注册为临时表,把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询.
Spark SQL默认数据源为Parquet格式,数据源为Parquet文件时,Spark SQL可以方便执行所有操作,修改配置项spark.sql.sources.default,可修改默认数据源格式 :
scala> val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name","favorite_color").write.save("namesAndFavColors.parquet")
scala>
当数据源格式不是parquet格式文件时,需要手动指定数据源格式,数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称定json,parquet,jdbc,orc,libsvm,csv,text来指定数据格式
可以通过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据.
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
scala>
除此之外,可以直接运行SQL在文件上.
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://systemhub511:9000/namesAndAges.parquet`")
scala> sqlDF.show()
scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> peopleDF.write.format("parquet").save("hdfs://hadoop102:9000/namesAndAges.parquet")
scala> peopleDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://systemhub511:9000/namesAndAges.parquet`")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
可以采用SaveMode执行存储操作,SaveMode定义了对数据处理模式,需要注意的是,这些保存模式不使用任何锁定,不是原子操作,此外当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除,SaveMode详细介绍如下表 :
| Scala / Java | Any Language | Meaning |
| SaveMode.ErrorIfExists(default) | “error”(default) | 如果文件存在,则报错 |
| SaveMode.Append | “append” | 追加 |
| SaveMode.Overwrite | “overwrite” | 覆写 |
| SaveMode.Ignore | “ignore” | 数据存在,则忽略 |
源码出处
org.apache.spark.sql.DataFrameWriter & org.apache.spark.sql.SaveMode
/**
* Specifies the behavior when data or table already exists. Options include:
* - `overwrite`: overwrite the existing data.
* - `append`: append the data.
* - `ignore`: ignore the operation (i.e. no-op).
* - `error`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter[T] = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
}
this
}
Spark SQL能够自动推测JSON数据集结构,并将它加载为Dataset[Row],可以通过SparkSession.read.json()加载JSON 文件.
JSON文件不是一个传统JSON文件,而是每一行都得是一个JSON串.
scala> import spark.implicits._
scala> val path = "examples/src/main/resources/people.json"
scala> val peopleDF = spark.read.json(path)
scala> peopleDF.createOrReplaceTempView("people")
scala> val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
scala> teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
scala> val otherPeopleDataset = spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
scala> val otherPeople = spark.read.json(otherPeopleDataset)
scala> otherPeople.show()
+---------------+----+
| address | name |
+---------------+----+
|[Columbus,Ohio]| Yin|
Parquet是一种流行列式存储格式,可以高效地存储具有嵌套字段记录,Parquet格式经常在Hadoop生态圈中被使用,它也支持Spark SQL全部数据类型,Spark SQL提供了直接读取和存储Parquet格式文件的方法.
scala> importing spark.implicits._
scala> import spark.implicits._
scala> val peopleDF = spark.read.json("examples/src/main/resources/people.json")
scala> peopleDF.write.parquet("hdfs://systemhub511:9000/people.parquet")
scala> val parquetFileDF = spark.read.parquet("hdfs://systemhub511:9000/people.parquet")
scala> parquetFileDF.createOrReplaceTempView("parquetFile")
scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
scala> namesDF.map(attributes => "Name: " + attributes(0)).show()
+------------+
| value|
+------------+
|Name: Justin|
+------------+
Spark SQL可以通过JDBC从关系型数据库中读取数据方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中.
注意,需要将相关数据库驱动放到spark类路径下.
[root@systemhub711 ~]# cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar /opt/module/spark/jars/
scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://systemhub711:3306/company").option("dbtable","staff").option("user","root").option("password","ax01465").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]
scala> jdbcDF.show
+---+-------+------+
| id| name| sex|
+---+-------+------+
| 1|test001| male|
| 2|test002|female|
| 3|test003|female|
| 4|test004| male|
| 5|test005|female|
| 6|test006| male|
| 7|test007|female|
| 8|test008|female|
| 9|test009|female|
| 10|test010|female|
| 11|test011|female|
| 12|test012| male|
| 13| Female| null|
| 14| Male| null|
| 15| Female| null|
+---+-------+------+
scala>
scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://systemhub711:3306/company").option("dbtable","rddtable2").option("user","root").option("password","ax01465").save()
Apache Hive是Hadoop上SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含,包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及Hive查询语言(HiveQL/HQL)等,需要强调的一点是,如果要在Spark SQL中包含Hive库,并不需要事先安装Hive,一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了,如果下载的是二进制版本Spark,它应该已经在编译时添加了Hive支持.
若要把Spark SQL连接到一个部署好Hive上,你必须把hive-site.xml复制到Spark配置文件目录中
($SPARK_HOME/conf),即使没有部署好Hive,Spark SQL也可以运行,需要注意的是,如果没有部署好Hive,Spark SQL会在当前工作目录中创建出Hive元数据仓库,叫作metastore_db,此外如果尝试使用HiveQL中的CREATE TABLE (并非CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在默认的文件系统中的/user/hive/warehouse目录中(如果classpath中有配好的hdfs-site.xml,默认文件系统就是HDFS,否则就是本地文件系统).
如果要使用内嵌Hive,什么都不用做,直接用就可以了,--conf:spark.sql.warehouse.dir=
如果使用是内部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库地址,如果需要是用HDFS作为路径,那么需要将core-site.xml和hdfs-site.xml加入到Spark conf目录,否则只会创建master节点上warehouse目录,查询时会出现文件找不到的问题,这是需要向使用HDFS,则需要将metastore删除,重启集群.
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
scala> spark.sql("create table hivetest(id int)")
19/06/03 02:03:17 WARN metastore.HiveMetaStore: Location: file:/opt/module/spark/spark-warehouse/hivetest specified for non-external table:hivetest
res5: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| hivetest| false|
+--------+---------+-----------+
scala> spark.sql("select * from hivetest").show()
+---+
| id|
+---+
+---+
scala>
如果想连接外部已经部署好的Hive,需要通过以下几个步骤 :
1.启动Hive服务
[root@systemhub711 spark]# /opt/module/hive/bin/hive
2.将Hive中的hive-site.xml拷贝或者软连接到Spark安装目录下conf目录下
[root@systemhub711 spark]# cp /opt/module/hive/conf/hive-site.xml ./conf/
3.开启spark shell终端 | 显示所有数据表并查看某张数据表数据
scala> spark.sql("show tables").show
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| business| false|
| default| dept| false|
| default| dept_partition| false|
| default| emp| false|
| default| emp_sex| false|
| default|hive_hbase_emp_table| false|
| default| hive_workflow| false|
| default| location| false|
| default| movie_info| false|
| default|multitasking_hive...| false|
| default| person_info| false|
| default| relevance_hbase_emp| false|
| default| score| false|
| default| staff_hive| false|
| default| test| false|
| default| test001| false|
| default| test002| false|
| default| test003| false|
| default| test004| false|
| default| test005| false|
+--------+--------------------+-----------+
only showing top 20 rows
scala> spark.sql("select * from emp").show
+-----+-----+---------+----+----------+--------+-----+------+
|empno|ename| job| mgr| hiredate| sal| comm|deptno|
+-----+-----+---------+----+----------+--------+-----+------+
| 7369|SMITH|CLERKSKLD|7902|1980-12-17| 800.0| 20.0| null|
| 7499|ALLTE|SALESMANS|7689|1987-02-23| 1600.0|300.0| 30|
| 7521|WAROS|SJDHHJDJX|7869|1984-06-12| 1250.18|500.0| 30|
| 7566|JOSSS|JDHYHDSDS|4545|1874-05-15| 2894.25| 20.0| null|
| 7654|SOCTD|MANSJUSSD|4855|1996-02-14| 2852.3| 30.0| null|
| 7698|ADAMS|JUSHHWESD|4552|1985-05-16|25524.02| 30.0| null|
| 7782|JAMSK|KIHNGSEHN|7769|1991-06-23| 1100.0| 20.0| null|
| 7788|FOESS|CLAEDFDFD|7698|1994-09-17| 950.0| 30.0| null|
| 7939|KINGS|CLADDJHEW|7566|1993-07-12| 3000.0| 20.0| null|
+-----+-----+---------+----+----------+--------+-----+------+
scala>
Spark SQL CLI可以很方便在本地运行Hive元数据服务以及从命令行执行查询任务.
在Spark目录下执行如下命令启动Spark SQL CLI
[root@systemhub711 spark]# bin/spark-sql
spark-sql (default)> show tables;
database tableName isTemporary
default business false
default dept false
default dept_partition false
default emp false
default emp_sex false
default hive_hbase_emp_table false
default hive_workflow false
default location false
default movie_info false
default multitasking_hive_workflow false
default person_info false
default relevance_hbase_emp false
default score false
default staff_hive false
default test false
default test001 false
default test002 false
default test003 false
default test004 false
default test005 false
default test006 false
default test007 false
default test008 false
default test_buck false
default test_bucket false
Time taken: 6.2 seconds, Fetched 25 row(s)
19/06/03 02:17:58 INFO CliDriver: Time taken: 6.2 seconds, Fetched 25 row(s)
spark-sql (default)>
pom.xml 公共依赖信息
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
<module>spark-core</module>
<module>spark-sql</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
</dependencies>
</project>
Create SparkHiveAction.scala
package com.geekparkhub.core.spark.application.sparksql
import org.apache.spark.sql.SparkSession
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* SparkHiveAction
* <p>
*/
object SparkHiveAction {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val sparkSession: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("SparkHiveAction")
.getOrCreate()
// 展示数据表信息
sparkSession.sql("show tables").show()
// 关闭资源
sparkSession.stop()
}
}
运行查看结果
+--------+--------------------+-----------+
|database| tableName|isTemporary|
+--------+--------------------+-----------+
| default| business| false|
| default| dept| false|
| default| dept_partition| false|
| default| emp| false|
| default| emp_sex| false|
| default|hive_hbase_emp_table| false|
| default| hive_workflow| false|
| default| location| false|
| default| movie_info| false|
| default|multitasking_hive...| false|
| default| person_info| false|
| default| relevance_hbase_emp| false|
| default| score| false|
| default| staff_hive| false|
| default| test| false|
| default| test001| false|
| default| test002| false|
| default| test003| false|
| default| test004| false|
| default| test005| false|
+--------+--------------------+-----------+
only showing top 20 rows
Spark Streaming用于流式数据处理,Spark Streaming支持数据输入源很多,例如 : Kafka / Flume / Twitter / ZeroMQ和简单TCP套接字等等,数据输入后可以用Spark高度抽象原语如 : map / reduce / join / window等进行运算,而结果也能保存在很多地方,如HDFS,数据库等,另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合.
Spark Streaming和Spark基于RDD概念很相似,Spark Streaming使用
离散化流(discretized stream)作为抽象表示,叫作DStream,DStream是随时间推移而收到的数据序列,在内部每个时间区间收到数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”).DStream可以从各种输入源创建,比如Flume / Kafka或者HDFS,创建出来DStream支持两种操作,一种是转化操作(
transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中,DStream提供了许多与RDD所支持的操作相类似操作支持,还增加了与时间相关新操作,比如滑动窗口.
1.易用
2.容错
3.易整合到Spark体系
1.需求 : 使用netcat工具向9999端口不断发送数据,通过SparkStreaming读取端口数据并统计不同单词出现次数.
2.追加依赖信息
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
Create StreamWordCounAction.scala
package com.geekparkhub.core.spark.application.example
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* StreamWordCounAction
* <p>
*/
object StreamWordCounAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("loacl[*]").setAppName("StreamWordCounAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 创建 DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("systemhub511", 9999)
// 将行数据转换为单词
val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" "))
// 将单词住转换为元祖
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
// 统计单词出现个数
val DStreamResult: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)
// 输出日志信息
DStreamResult.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
4.启动Hadoop集群服务(包括Spark服务)
[root@systemhub511 ~]# start-cluster.sh
[root@systemhub511 spark]# sbin/start-all.sh
[root@systemhub511 spark]# sbin/start-history-server.sh
5.启动程序并通过NetCat发送数据
[root@systemhub511 spark]# nc -lk 9999
hello
spark
io
io
io
6.查看日志信息
-------------------------------------------
Time: 1559563323000 ms
-------------------------------------------
(hello,1)
-------------------------------------------
Time: 1559563326000 ms
-------------------------------------------
(spark,1)
-------------------------------------------
Time: 1559563329000 ms
-------------------------------------------
Time: 1559563341000 ms
-------------------------------------------
(io,1)
-------------------------------------------
Time: 1559563344000 ms
-------------------------------------------
(io,2)
Discretized Stream是Spark Streaming基础抽象,代表持续性数据流和经过各种Spark原语操作后的结果数据流,在内部实现上DStream是一系列连续的RDD来表示,每个RDD含有一段时间间隔内的数据.
对数据操作也是按照RDD为单位来进行
计算过程由Spark engine来完成
Spark Streaming原生支持一些不同数据源,一些核心数据源已经被打包到Spark Streaming的Maven工件中,而其他一些则可以通过spark-streaming-kafka等附加工件获取,每个接收器都以Spark执行器程序中一个长期运行的任务形式运行,因此会占据分配给应用CPU核心.
此外还需要有可用的CPU核心来处理数据,这意味着如果要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数,例如如果想要在流计算应用中运行10个接收器,那么至少需要为应用分配11个CPU核心,所以如果在本地模式运行,不要使用
local或者local[1]
文件数据流 : 能够读取所有HDFS API兼容文件系统文件,通过
fileStream方法进行读取,Spark Streaming将会监控dataDirectory目录并不断处理移动进来的文件,但是目前不支持嵌套目录.
注意事项 :
1.文件需要有相同数据格式
2.文件进入dataDirectory方式需要通过移动或者重命名来实现
3.一旦文件移动进目录,则不能再修改,即便修改也不会读取新数据
streamingContext.textFileStream(dataDirectory)
1.在HDFS上创建用于被监听目录
[root@systemhub511 spark]# hadoop fs -mkdir /core_flow/spark/filestream
2.创建三个文件
[root@systemhub511 filestream]# vim a.txt
[root@systemhub511 filestream]# vim b.txt
[root@systemhub511 filestream]# vim c.txt
3.Create FileStreamAction.scala
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* FileStreamAction
* <p>
*/
object FileStreamAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FileStreamAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 监控文件夹 DStream
val fileDStream: DStream[String] = ssc.textFileStream("hdfs://systemhub511:9000/core_flow/spark/filestream/")
// 输出日志信息
fileDStream.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
4.启动程序
-------------------------------------------
Time: 1559566113000 ms
-------------------------------------------
-------------------------------------------
Time: 1559566116000 ms
-------------------------------------------
-------------------------------------------
Time: 1559566119000 ms
-------------------------------------------
5.上传文件
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/a.txt /core_flow/spark/filestream/
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/b.txt /core_flow/spark/filestream/
[root@systemhub511 spark]# hadoop fs -put /opt/module/datas/spark_flow/filestream/c.txt /core_flow/spark/filestream/
6.查看日志信息
-------------------------------------------
Time: 1559566146000 ms
-------------------------------------------
SparkStreaming
SparkStreaming
SparkStream
DStream
-------------------------------------------
Time: 1559566155000 ms
-------------------------------------------
textFileStream
textFileStream
StreamingContext
-------------------------------------------
Time: 1559566164000 ms
-------------------------------------------
awaitTermination
hadoop
测试过程中,可以通过使用
ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理.
1.需求 : 循环创建RDD,将RDD放入队列,通过SparkStream创建Dstream计算WordCoun.
2.Create QueuStreamAction.scala
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* QueuStreamAction
* <p>
*/
object QueuStreamAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("QueuStreamAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
// 创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
// 创建 rddDStream
val rddDStream: InputDStream[Int] = ssc.queueStream(rddQueue,false)
// 统计计算
val result: DStream[Int] = rddDStream.reduce(_ + _)
// 输出日志信息
result.print()
// 启动流式任务
ssc.start()
// 循环创建RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 100, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
3.启动程序 查看日志信息
-------------------------------------------
Time: 1559567436000 ms
-------------------------------------------
5050
-------------------------------------------
Time: 1559567439000 ms
-------------------------------------------
10100
-------------------------------------------
Time: 1559567442000 ms
-------------------------------------------
5050
-------------------------------------------
Time: 1559567445000 ms
-------------------------------------------
5050
需要继承Receiver,并实现onStart & onStop方法来自定义数据源采集.
需求 : 自定义数据源,实现监控某个端口号,获取该端口号内容.
1.Create CustomizeReceiver.scala
package com.geekparkhub.core.spark.application.datastream
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomizeReceiver
* <p>
*/
class CustomizeReceiver(hostName: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
// 开始读取数据
override def onStart(): Unit = {
new Thread("receiver") {
override def run(): Unit = {
receiver()
}
}.start()
}
// 读取数据
def receiver(): Unit = {
try {
// 创建 Socket
val socket = new Socket(hostName, port)
// 定义变量,用来接收端口传过来的数据
var input: String = null
// 创建BufferedReader用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
// 赋值
input = reader.readLine()
while (input != null) {
store(input)
input = reader.readLine()
}
// 跳出循环则关闭资源
reader.close()
socket.close()
// 重启流式任务
restart("restart")
} catch {
case e: Exception => restart("restart")
}
}
// 结束读取数据
override def onStop(): Unit = {}
}
2.Create CustomizeReceiverAction.scala
package com.geekparkhub.core.spark.application.datastream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Geek International Park | 极客国际公园
* GeekParkHub | 极客实验室
* Website | https://www.geekparkhub.com/
* Description | Open开放 · Creation创想 | OpenSource开放成就梦想 GeekParkHub共建前所未见
* HackerParkHub | 黑客公园枢纽
* Website | https://www.hackerparkhub.org/
* Description | 以无所畏惧的探索精神 开创未知技术与对技术的崇拜
* GeekDeveloper : JEEP-711
*
* @author system
* <p>
* CustomizeReceiverAction
* <p>
*/
object CustomizeReceiverAction {
def main(args: Array[String]): Unit = {
// 创建 SparkConf
val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CustomizeReceiverAction")
//创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(3))
val lineDStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomizeReceiver("systemhub511", 9999))
// 将行数据转换为单词
val wordDStream: DStream[String] = lineDStream.flatMap(_.split(" "))
// 将单词住转换为元祖
val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
// 统计单词出现个数
val DStreamResult: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_ + _)
// 输出日志信息
DStreamResult.print()
// 启动流式任务
ssc.start()
ssc.awaitTermination()
}
}
3.启动程序并通过NetCat发送数据
[root@systemhub511 spark]# nc -lk 9999CustomizeReceiverAction
CustomizeReceiverAction
CustomizeReceiver
CustomizeReceiver
CustomizeReceiv
4.查看日志信息
-------------------------------------------
Time: 1559570220000 ms
-------------------------------------------
Time: 1559570226000 ms
-------------------------------------------
(CustomizeReceiverAction,1)
-------------------------------------------
Time: 1559570229000 ms
-------------------------------------------
(CustomizeReceiverAction,1)
(CustomizeReceiver,1)
-------------------------------------------
Time: 1559570232000 ms
-------------------------------------------
(CustomizeReceiver,1)
-------------------------------------------
Time: 1559570238000 ms
-------------------------------------------
(CustomizeReceiv,1)
在工程中需要引入Maven工件spark-streaming-kafka_2.10来使用它,包内提供的KafkaUtils对象可以在StreamingContext和JavaStreamingContext中以Kafka消息创建出DStream.
由于KafkaUtils可以订阅多个主题,因此它创建出的DStream由成对的主题和消息组成,要创建出一个流数据,需要使用StreamingContext实例、一个由逗号隔开的ZooKeeper主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用createStream()方法.
需求 : 通过SparkStreaming从Kafka读取数据,并将读取过来数据做简单计算(WordCount),最终将信息打印至控制台.
1.追加依赖信息
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.geekparkhub.core.spark</groupId>
<artifactId>spark_server</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>spark-common</module>
<module>spark-core</module>
<module>spark-sql</module>
<module>spark-streaming</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>
</project>
Create KafkaSparkStreamingAction.scala
DStream上的原语与RDD类似,分为Transformations(转换)和Output Operations()输出)两种,此外转换操作中还有一些比较特殊原语,如 : updateStateByKey()、transform()以及各种Window相关原语.
Blog内容大多是手敲,所以难免会有笔误,你可以帮我找错别字。
很多知识点我可能没有涉及到,所以你可以对其他知识点进行补充。
现有的知识点难免存在不完善或者错误,所以你可以对已有知识点的修改/补充。
💡欢迎贡献各领域开源野生Blog&笔记&文章&片段&分享&创想&OpenSource Project&Code&Code Review
🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈 issues: geekparkhub.github.io/issues 🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈🙈
FaceBook:JEEP SevenEleven
Twitter:@JEEP7ll
Sina Weibo: @JEEP-711
GeekParkHub GithubHome:https://github.com/geekparkhub
GeekParkHub GiteeHome:https://gitee.com/geekparkhub
Blog GardenHome:http://www.cnblogs.com/JEEP711/
W3C/BlogHome:https://www.w3cschool.cn/jeep711blog/
CSDN/BlogHome:http://blog.csdn.net/jeep911
51CTO/BlogHome:http://jeep711.blog.51cto.com/
Official Public Email
Group Email:geekparkhub@outlook.com —— hackerparkhub@outlook.com —— hackerpark@hotmail.com
User Email:jeep711.home.@gmail.com —— jeep-711@outlook.com
System Email:systemhub-711@outlook.com
Service Email:servicehub-711@outlook.com
致谢:捐助时请备注 UserName
| ID | UserName | Donation | Money | Consume |
| 1 | Object | WeChatPay | 5RMB | 一杯可乐 |
| 2 | 泰迪熊看月亮 | AliPay | 20RMB | 一杯咖啡 |
| 3 | 修仙道长 | WeChatPay | 10RMB | 两杯可乐 |